{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "CI = False"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Skip this notebook execution in CI because it hits non-public buckets\n",
    "if CI:\n",
    "    import sys\n",
    "\n",
    "    sys.exit()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import daft"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Reading Parquet Files\n",
    "\n",
    "## Read all data in Parquet files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = daft.read_parquet(\n",
    "    \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read one column only in Parquet files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = daft.read_parquet(\n",
    "    \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
    ")\n",
    "df = df.select(\"L_ORDERKEY\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read filtered data from Parquet files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = daft.read_parquet(\n",
    "    \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/\"\n",
    ")\n",
    "df = df.where(df[\"L_ORDERKEY\"] < 100)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Reading **many small files**\n",
    "\n",
    "## Listing Files\n",
    "\n",
    "Let's compare naive listing with the Python boto3 library with Daft's s3 listing capabilities.\n",
    "\n",
    "Listing many small files (100k++) is a notoriously expensive operation in S3, but Daft provides an extremely efficient solution for common cases with hierarchical file namespaces."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "import boto3\n",
    "\n",
    "client = boto3.client(\"s3\")\n",
    "kwargs = {\"Bucket\": \"daft-public-datasets\", \"Prefix\": \"tpch-lineitem/10k-1mb-csv-files\"}\n",
    "response = client.list_objects_v2(**kwargs)\n",
    "data = response[\"Contents\"]\n",
    "token = response.get(\"NextContinuationToken\")\n",
    "\n",
    "while token is not None:\n",
    "    if token is not None:\n",
    "        kwargs[\"ContinuationToken\"] = token\n",
    "    response = client.list_objects_v2(**kwargs)\n",
    "    data.extend(response[\"Contents\"])\n",
    "    token = response.get(\"NextContinuationToken\")\n",
    "\n",
    "print(f\"Retrieved {len(data)} results.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df = daft.from_glob_path(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reading 10K small 1MB CSV files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = daft.read_csv(\"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(df.num_partitions())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Downloading data from URLs\n",
    "\n",
    "In many unstructured/complex data workloads, you will often have URLs in your table pointing out to some external data. Daft is extremely fast at downloading this data. Much faster than anything I've ever managed to build using boto3 and Python."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = daft.from_glob_path(\"s3://daft-public-data/open-images/validation-images/**.jpg\")\n",
    "\n",
    "IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(max_connections=64))\n",
    "df = df.with_column(\"data\", df[\"path\"].url.download(io_config=IO_CONFIG))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "df.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Running a full TPC-H Query\n",
    "\n",
    "Let's see what this profiling looks like for running a TPC-H query on "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "import daft\n",
    "\n",
    "PARQUET_FOLDER = \"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/\"\n",
    "\n",
    "\n",
    "def get_df(table_name: str) -> daft.DataFrame:\n",
    "    return daft.read_parquet(os.path.join(PARQUET_FOLDER, table_name, \"*.parquet\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import datetime\n",
    "\n",
    "from daft import col\n",
    "\n",
    "lineitem = get_df(\"lineitem\")\n",
    "\n",
    "discounted_price = col(\"L_EXTENDEDPRICE\") * (1 - col(\"L_DISCOUNT\"))\n",
    "taxed_discounted_price = discounted_price * (1 + col(\"L_TAX\"))\n",
    "df = (\n",
    "    lineitem.where(col(\"L_SHIPDATE\") <= datetime.date(1998, 9, 2))\n",
    "    .groupby(col(\"L_RETURNFLAG\"), col(\"L_LINESTATUS\"))\n",
    "    .agg(\n",
    "        col(\"L_QUANTITY\").alias(\"sum_qty\").sum(),\n",
    "        col(\"L_EXTENDEDPRICE\").alias(\"sum_base_price\").sum(),\n",
    "        discounted_price.alias(\"sum_disc_price\").sum(),\n",
    "        taxed_discounted_price.alias(\"sum_charge\").sum(),\n",
    "        col(\"L_QUANTITY\").alias(\"avg_qty\").mean(),\n",
    "        col(\"L_EXTENDEDPRICE\").alias(\"avg_price\").mean(),\n",
    "        col(\"L_DISCOUNT\").alias(\"avg_disc\").mean(),\n",
    "        col(\"L_QUANTITY\").alias(\"count_order\").count(),\n",
    "    )\n",
    "    .sort([\"L_RETURNFLAG\", \"L_LINESTATUS\"])\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10.9"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
